package com.iteaj.iot.server.udp;

import com.iteaj.iot.*;
import com.iteaj.iot.codec.adapter.SocketMessageDecoderDelegation;
import com.iteaj.iot.config.ConnectProperties;
import com.iteaj.iot.server.SocketServerComponent;
import com.iteaj.iot.server.codec.UdpServerProtocolEncoder;
import com.iteaj.iot.server.handle.ProtocolBusinessHandler;
import com.iteaj.iot.udp.UdpProtocolException;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;

import java.util.List;
import java.util.Optional;

import static com.iteaj.iot.CoreConst.*;

public abstract class UdpServerComponent<M extends UdpServerMessage> extends SocketServerComponent<M, DatagramPacket> {

    public UdpServerComponent(ConnectProperties connectProperties) {
        super(connectProperties);
    }

    @Override
    public SocketMessage proxy(ChannelHandlerContext ctx, DatagramPacket in) throws Exception {
        return super.proxy(ctx, in);
    }

    @Override
    public List<? extends SocketMessage> decodes(ChannelHandlerContext ctx, DatagramPacket in) throws Exception {
        return super.decodes(ctx, in);
    }

    @Override
    public AbstractBootstrap initParentChannel() {
        return initOptions(new Bootstrap()
                .group(IotThreadManager.instance().getWorkerGroup())
                .channel(NioDatagramChannel.class))
                .handler(new ChannelInitializer<NioDatagramChannel>() {

            @Override
            protected void initChannel(NioDatagramChannel ch) throws Exception {
                getDeviceManager().setChannel(ch);
                UdpServerComponent.this.doInitParentChannel(ch.pipeline());
            }
        });
    }

    protected Bootstrap initOptions(Bootstrap bootstrap) {
        return bootstrap;
    }

    protected void doInitParentChannel(ChannelPipeline ch) {
        // 设置编码器
        ch.addFirst(SERVER_ENCODER_HANDLER, UdpServerProtocolEncoder.getInstance());

        // 新增解码器到处理链第一个位置
        final ChannelInboundHandlerAdapter decoder = getMessageDecoder();
        if (decoder == null) {
            throw new ProtocolException("未指定设备报文解码器：" + UdpServerComponent.this.getName());
        }

        if(decoder instanceof SocketMessageDecoderDelegation) {
            if(((SocketMessageDecoderDelegation<?>) decoder).getDelegation() == null) {
                UdpServerComponent udpServerComponent = UdpServerComponent.this;
                ((SocketMessageDecoderDelegation<?>) decoder).setDelegation(udpServerComponent);
            }
        }

        ch.addFirst(SERVER_DECODER_HANDLER, decoder);

        // 事件处理器
        ch.addLast(EVENT_MANAGER_HANDLER, UdpEventManagerHandler.getInstance());

        // 新增业务处理器到处理链到最后一个位置
        ch.addLast(SERVER_SERVICE_HANDLER, ProtocolBusinessHandler.getInstance());

        // 自定义处理器
        UdpServerComponent.this.doInitChannel(ch);
    }

    @Override
    public Optional<ChannelFuture> writeAndFlush(String equipCode, Object msg, Object... args) {
        if(msg instanceof Protocol) {
            return this.writeAndFlush(equipCode, (Protocol) msg);
        } else if(msg instanceof Message){
            if(!(msg instanceof UdpServerMessage)) {
                throw new UdpProtocolException("udp协议只支持服务端报文类型["+UdpServerMessage.class.getSimpleName()+"]");
            }

            if(((UdpServerMessage) msg).getRecipient() == null) {
                UdpIdleState udpIdleState = getDeviceManager().find(equipCode);
                if(udpIdleState == null) {
                    return Optional.of(getChannel().newFailedFuture(NotDeviceException.DEFAULT));
                }

                ((UdpServerMessage) msg).setRecipient(udpIdleState.getAddress());
            }
        }

        return getDeviceManager().writeAndFlush(equipCode, msg);
    }

    @Override
    public Optional<ChannelFuture> writeAndFlush(String equipCode, Protocol protocol) {
        Message requestMessage = protocol.requestMessage();
        if(!(requestMessage instanceof UdpServerMessage)) {
            throw new UdpProtocolException("udp协议只支持服务端报文类型["+UdpServerMessage.class.getSimpleName()+"]");
        }

        if(((UdpServerMessage) requestMessage).getRecipient() == null) {
            UdpIdleState udpIdleState = getDeviceManager().find(equipCode);
            if(udpIdleState == null) {
                return Optional.of(getChannel().newFailedFuture(NotDeviceException.DEFAULT));
            }

            ((UdpServerMessage) requestMessage).setRecipient(udpIdleState.getAddress());
        }

        // 写出报文
        return getDeviceManager().writeAndFlush(equipCode, protocol);
    }

    @Override
    public UdpDeviceManager getDeviceManager() {
        return (UdpDeviceManager) super.getDeviceManager();
    }

    @Override
    protected SocketDeviceManager createDeviceManager() {
        return (SocketDeviceManager) FrameworkManager.createDeviceManager(this);
    }

    protected NioDatagramChannel getChannel() {
        return (NioDatagramChannel) super.getChannel();
    }

    @Override
    public PortType getPortType() {
        return PortType.Udp;
    }

    @Override
    public synchronized void close() {
        if(isStart()) {
            // 关闭服务端组件
            super.close();
        }
    }
}
